文章目录
  1. 1. RetroLambda
  2. 2. ReactiveX
  3. 3. ReactiveX,异步客户端
  4. 4. 什么是RxJava
  5. 5. Observables&Observers
  6. 6. 通信组件
  7. 7. Retrofit&RxJava
  8. 8. Schedulers(调度器)
  9. 9. Operators(操作者)
  10. 10. 错误处理

原文:When Iron Man becomes reactive, RxJava

一个像Rxjava这样的ReactiveX框架,可以帮助你轻松处理在不同的线程上运行的任务。在android上,这经常是一个棘手的问题。

这篇文章也关注operators如何让开发任务更高效,Reactive Extensions提供很多很多operators让你的生活更容易。
惯例,所有的代码都放在了Github,欢迎评论,issue或拍砖!

上一篇我们讲了Dagger2,接下来我们会看到更少的耦合和更好的扩展性。

RetroLambda

有时候,在java大型项目里,或者在大型框架里,像android这种,很难去使用java8的特性,比如Lambda表达式.Retrolambda就是用来解决这个问题的。

1
2
dependencies {
classpath 'me.tatarka:gradle-retrolambda:3.1.0'

1
2
3
4
5
6
7
8
9
10
11
12
 apply plugin: 'me.tatarka.retrolambda'

android {

...

compileOptions {
sourceCompatibility JavaVersion.VERSION_1_8
targetCompatibility JavaVersion.VERSION_1_8
}
}
}

RetroLambda使你写更少的样板代码,同样让你的代码更清晰易懂。看这个例子:

不用RetroLambda

1
2
3
4
5
6
7
Observable.just("Hello, world!")
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
});

用了RetroLambda

1
2
3
Observable.just("Hello, world!") .subscribe(
s -> System.out.println(s)
);

在复仇者联盟的例子里

1
2
3
4
5
6
7
8
9
10
11
12
mCharacterSubscription = mGetCharacterInformationUsecase
.execute().subscribe(
character -> onAvengerReceived(character),
error -> manageError(error)
);


mComicsSubscription = mGetCharacterComicsUsecase
.execute().subscribe(
comics -> Observable.from(comics).subscribe(
comic -> onComicReceived(comic)),

error -> manageError(throwable)
);

ReactiveX

ReactiveX的主要原则是观察者模式,迭代器模式和函数式编程。

ReactiveX也用来异步编程,实际上使用它你可以非常容易的实现异步任务。

ReactiveX,异步客户端

ReactiveX的一大作用就是你可以用它写一个完整的异步api或客户端,然后在实现时决定是使用异步还是线程还是同步。

所以我们使用observable API而不是阻塞的API.

1
2
3
4
public interface Usecase<T> {

Observable<T> execute();
}

1
2
3
4
5
6
public interface Repository {

Observable<Character> getCharacter (final int characterId);

Observable<List<Comic>> getCharacterComics (final int characterId);
}

什么是RxJava

RxJavaNetflix开发的一个Reactive Extensions的实现。

Observables&Observers

一个Observable(被观察者)发出一个或者多个对象,这些对象被订阅了ObservableObserver(观察者)消费或接收。

Observer必须向Observable注册,当Observer注册之后,就创建了一个Subscription对象,这个对象用来从Observableunsubscribe(取消订阅),这对于ActivityFragment里的onStop或者onPause是非常有用的。比如:

1
2
mCharacterSubscription = mGetCharacterInformationUsecase
.execute().subscribe( ... );

1
2
3
4
5
6
7
8
9
@Override
public void onStop() {

if (!mCharacterSubscription.isUnsubscribed())
mCharacterSubscription.unsubscribe();

if (!mComicsSubscription.isUnsubscribed())
mComicsSubscription.unsubscribe();
}

无论何时ObserverObservable注册,都要实现三个方法:

  • onNext(T)接收Observale发出的对象
  • onError(Exception)当发生错误时调用这个方法
  • onCompleted()Observable完成发出对象时调用这个方法

通信组件

让我们看一看怎么使用GetCharacterInformationUsecase用例。所有的用例都实现接口Usercase<T>:

1
2
3
4
public interface Usecase<T> {

Observable<T> execute();
}

当这个用例运行的时候,它返回一个Observable对象,这对于链接observable&operators非常有用,我们一会会看到这些operators的强大力量。

当我们运行GetCharacterInformationUsecase,我们告诉repository去发送一个数据请求:

1
2
3
4
5
6
@Override
public Observable<Character> execute() {

return mRepository.getCharacter(mCharacterId);
// .awesomeRxStuff();
}

我们的AvengerDetailPresenter会是这个用例的Observer,它订阅Observable发出的事件。我们用subscribe来实现订阅,把ObserverObservable连接起来。
onNextonError方法用来管理操作的结果。onCompleted方法在本例中没有实现因为不必要。

1
2
3
4
mCharacterSubscription = mGetCharacterInformationUsecase
.execute().subscribe(
character -> onAvengerReceived(character),
error -> manageError(error));

Retrofit&RxJava

Square开发的Retrofit支持Observable类型,所以请求可以被Observer观察和被operator转换。

你必须知道在哪儿调用它,Retrofit在你的Observable的线程上执行请求,所以如果你从UI线程调用它就会产生错误。让我们谈一谈Schedulers.

Schedulers(调度器)

你可以用不同的线程,一个Thread Executor,或者预设的Schedulers,比如,对于输入和输出操作有Schedulers.io().

1
2
3
4
5
6
7
@Override
public Observable<Character> execute() {


return mRepository.getCharacter(mCharacterId)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread());
}

这个例子证明了Rx为管理多线程带来的方便,在android开发中,多线程经常是个麻烦的事情。

Operators(操作者)

Operators是ReactiveX的重中之重,用来操纵,转换或者连接由Observable发出的对象。
想一下,一个角色的漫画列表,漫画有特定的年份,而我们想显示某一年的漫画。ReactiveX来帮我们了!

我们使用operator filter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public Observable<Comic> filterByYear(String year) {

if (mComics != null) {
return Observable.from(mComics).filter(
comic -> {
for (ComicDate comicDate : comic.getDates())
if (comicDate.getDate().startsWith(year))
return true;

return false;
});

}

return null;
}

错误处理

Rx operators可以帮我们节省时间提高生产力的另一个例子就是,它的error handling operators.

想一想,如果一个用户这在使用网络请求,但是不巧他正在地铁隧道里,这是网络连接会受影响。

当我们收到一个Retrofit发出的SocketTimeoutException,我们可以使用operator retry

retry会接收一个断言,如果我们返回true,那么Rx就再次发出一个Observable让Retrofit在为我们请求一次。

如果'SocketTimeoutExceptions达到了最大次数,就会执行onError去处理错误了。

1
2
3
4
5
6
7
8
9
10
11
12
@Override
public Observable<List<Comic>> getCharacterComics(int characterId) {

final String comicsFormat = "comic";
final String comicsType = "comic";

return mMarvelApi.getCharacterComics(
characterId, comicsFormat, comicsType)
.retry((attemps, error) ->
error instanceof SocketTimeoutException &&
attemps < MAX_ATTEMPS);
}

文章目录
  1. 1. RetroLambda
  2. 2. ReactiveX
  3. 3. ReactiveX,异步客户端
  4. 4. 什么是RxJava
  5. 5. Observables&Observers
  6. 6. 通信组件
  7. 7. Retrofit&RxJava
  8. 8. Schedulers(调度器)
  9. 9. Operators(操作者)
  10. 10. 错误处理